/**
 * Copyright (c) 2023 murenchao
 * taomu is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *       http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */
package cool.taomu.mqtt.mqttv3

import com.google.gson.Gson
import cool.taomu.mqtt.mqttv3.Topic.MessageType
import cool.taomu.mqtt.mqttv3.Topic.QoS
import cool.taomu.utils.crypto.Base64
import java.util.ArrayList
import java.util.UUID
import java.util.concurrent.ExecutorService
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttConnectOptions
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.slf4j.LoggerFactory

class MqttV3Service {
	val static LOG = LoggerFactory.getLogger(MqttV3Service);

	def static void subscriber(ExecutorService es, String host, int port, String username, String password,
		Class<? extends MqttCallback> ... callbacks) {
		callbacks.filterNull.forEach [
			es.execute([
				val topics = it.getAnnotation(Topics);
				if (topics !== null) {
					topics.value.filterNull.filter[it.messageType == MessageType.SUBSCRIBER].forEach [ topic |
						subscriber(topic, host, port, username, password, it);
					]
				} else {
					val topic = it.getAnnotation(Topic);
					subscriber(topic, host, port, username, password, it);
				}
			]);
		]
	}

	static def void sender(Topic topic, String host, int port, String username, String password, byte[] payload) {
		if (topic.messageType == MessageType.SENDER) {
			sender(topic.value.get(0), topic.timeout, topic.isSsl, topic.clientId, topic.qos.get(0).ordinal,
				topic.retain, host, port, username, password, payload);
		}
	}

	static def void sender(String topic, String host, int port, String username, String password, byte[] payload) {
		sender(topic, 6000, false, UUID.randomUUID.toString, 1, true, host, port, username, password, payload);
	}

	static def void sender(String topic, int timeout, boolean isSsl, String clientId, int qos, boolean retain,
		String host, int port, String username, String password, byte[] payload) {
		try (var memoryPersistence = new MemoryPersistence()) {
			var mqttConnectOptions = new MqttConnectOptions();
			mqttConnectOptions.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;
			mqttConnectOptions.setCleanSession(true);
			if (username !== null) {
				mqttConnectOptions.setUserName(username);
				mqttConnectOptions.setPassword(password.toCharArray());
			}
			mqttConnectOptions.setConnectionTimeout(timeout);
			mqttConnectOptions.setKeepAliveInterval(20);
			var mqttClient = new MqttClient(String.format("%s://%s:%d", isSsl ? "ssl" : "tcp", host, port), clientId,
				memoryPersistence);
			mqttClient.connect(mqttConnectOptions);
			var mqttMessage = new MqttMessage(new Base64(payload).encode());
			mqttMessage.setQos(qos);
			mqttMessage.setRetained(retain);
			mqttClient.publish(topic, mqttMessage);
			LOG.debug("clientId: {} 发送成功", clientId);
			mqttClient.disconnect;
		} catch (Exception ex) {
			LOG.info("mqtt 异常:", ex);
		}
	}

	private static def void subscriber(Topic topic, String host, int port, String username, String password,
		Class<? extends MqttCallback> callback) {
		if (topic.messageType == MessageType.SUBSCRIBER) {
			var will = callback.getAnnotation(Will);
			if (topic !== null) {
				LOG.info("Topic : " + new Gson().toJson(topic.value));
				val memoryPersistence = new MemoryPersistence();
				val mqttConnectOptions = new MqttConnectOptions();
				mqttConnectOptions.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;
				mqttConnectOptions.setCleanSession(topic.cleanSession);
				if (username !== null && !username.trim.equals("")) {
					mqttConnectOptions.setUserName(username);
					mqttConnectOptions.setPassword(password.toCharArray());
				}
				mqttConnectOptions.setConnectionTimeout(topic.timeout);
				mqttConnectOptions.setKeepAliveInterval(topic.keepAlive);
				if (will !== null) {
					mqttConnectOptions.setWill(will.topic, will.message.getBytes("UTF-8"), will.qos, will.retain);
				}
				var broker = String.format("%s://%s:%d", topic.isSsl ? "ssl" : "tcp", host, port);
				var uuid = UUID.randomUUID.toString; // .replace("-", "");
				if (!topic.clientId.equals("uuid")) {
					uuid = topic.clientId;
				}
				val mqttClient = new MqttClient(broker, uuid, memoryPersistence);
				var mc = callback.newInstance;
				mc.setClient = mqttClient;
				mc.setOptions = mqttConnectOptions;
				mc.host = host;
				mc.port = port;
				if (username !== null && !username.trim.equals("")) {
					mc.username = username;
					mc.password = password;
				}
				mqttClient.callback = mc;
				mqttClient.connect(mqttConnectOptions);

				var qoss = new ArrayList();
				qoss.addAll(topic.qos.map[return it.ordinal]);
				for (var i = 0; qoss.size < topic.value.size && i < topic.value.size - qoss.size; i++) {
					qoss.add(QoS.AT_MOST_ONCE.ordinal);
				}
				mqttClient.subscribe(topic.value, qoss);
				Runtime.runtime.addShutdownHook(new Thread() {
					override run() {
						mqttClient.unsubscribe(topic.value);
						mqttClient.disconnect;
						memoryPersistence.close;
					}
				})
			}
		}
	}
}
